﻿/**************************************************************
 * Copyright gt1987. All rights reserved.
 * 
 * Author: guitao(guitao@eastmoney.com) 
 * Create Date: 2020/5/8 21:39:39
 * Description: TaskSchedulerManager
 *          
 * Revision History:
 *      Date         Author               Description
 *              
***************************************************************/

using gt.TaskScheduler.Core.Components;
using log4net;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace gt.TaskScheduler.Core.Impl
{
    /// <summary>
    /// 集群节点 任务调度管理器
    /// </summary>
    public class TaskSchedulerMember : ITaskSchedulerMember
    {
        private ITaskSchedulerMananger _manager;
        private ITaskHandlerProvider _handlerProvider;
        private int _threadsCount = 1;
        private string _name;
        private ITaskSchedulerLeader _leader;

        public TaskSchedulerMember(
            ITaskSchedulerUniqueIdGenerator uniqueIdGenerator,
            ITaskSchedulerMananger manager,
            ITaskHandlerProvider handlerProvider,
            int threads)
        {
            _name = uniqueIdGenerator.GetUniqueName();
            _manager = manager;
            _handlerProvider = handlerProvider;
            _threadsCount = threads;

            _manager.RegisterMember(_name);

            LogHelper.GetLogger().Info($"task scheduler member: {_name} register.");
        }

        /// <summary>
        /// 节点名称
        /// </summary>
        public string Name { get { return _name; } }
        /// <summary>
        /// 启动并运行任务处理
        /// </summary>
        public void Run()
        {
            //等待Leader初始化完成
            if (!WaitLeaderInitialization())
            {
                LogHelper.GetLogger().Error("wait task scheduler leader failed.i am quit!");
                return;
            }
            try
            {
                Task[] tasks = new Task[_threadsCount];
                for (int i = 0; i < _threadsCount; i++)
                {
                    tasks[i] = Task.Factory.StartNew(() =>
                    {
                        while (true)
                        {
                            string taskName = string.Empty;
                            try
                            {
                                //获取task队列key
                                taskName = _manager.DequeueTaskName();
                                //TODO:如果这里获取taskName异常，会造成while循环不能停止
                                if (string.IsNullOrEmpty(taskName))
                                {
                                    LogHelper.GetLogger().Info("task scheduler queue is empty.stop run.");
                                    break;
                                }

                                LogHelper.GetLogger().Info($"get taskName: {taskName}");

                                var handler = _handlerProvider.GetHandler(taskName);

                                if (handler == null)
                                {
                                    LogHelper.GetLogger().Error($"can not get taskHandler by taskName: {taskName},continue next.");
                                    continue;
                                }

                                handler.Execute(taskName).GetAwaiter().GetResult();

                                LogHelper.GetLogger().Info($"finished taskName: {taskName},continue next");
                            }
                            catch (Exception ex)
                            {
                                LogHelper.GetLogger().Error($"taskName: {taskName} handle failed", ex);
                            }
                        }
                    }, TaskCreationOptions.LongRunning);
                }

                _manager.MarkMemberStatus(_name, MemberStatus.Run);

                Task.WaitAll(tasks);
            }
            catch (Exception ex)
            {
                LogHelper.GetLogger().Error(ex);
            }
            finally
            {
                LogHelper.GetLogger().Info("member stop.");
                _manager.MarkMemberStatus(_name, MemberStatus.Stop);
                if (_leader != null)
                {
                    LogHelper.GetLogger().Info("i am leader.ready to monitor all member status.");
                    _leader.MonitTaskSchedulerMembers().Wait(TimeSpan.FromMinutes(60));
                }
            }
        }
        /// <summary>
        /// 设置为主节点
        /// </summary>
        /// <param name="leader"></param>
        public void SetAsLeader(ITaskSchedulerLeader leader)
        {
            _leader = leader;
        }

        /// <summary>
        /// 等待Leader完成初始化
        /// </summary>
        private bool WaitLeaderInitialization()
        {
            var task = Task.Run(() =>
            {
                while (true)
                {
                    if (_manager.CheckTaskSchedulerRunning())
                    {
                        break;
                    }
                    Thread.Sleep(500);
                }
            });

            return task.Wait(TimeSpan.FromSeconds(10));
        }
    }
}